package com.z.demo

import org.apache.flink.table.api._

/**
 * @Author wenz.ma
 * @Date 2021/10/27 13:47
 * @Desc table api 中 groupBy的使用
 */
object TableApiGroupBy01 {
  def main(args: Array[String]): Unit = {
    val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
    val tEnv = TableEnvironment.create(settings)

    tEnv.executeSql(
      """
        |create table student (
        |id int,
        |name string,
        |sex string
        |)with(
        | 'connector' = 'kafka',
        | 'topic' = 'test-topic',
        | 'properties.bootstrap.servers' = 'server120:9092',
        | 'properties.group.id' = 'testGroup',
        | 'scan.startup.mode' = 'latest-offset',
        | 'format' = 'csv'
        |)
        |""".stripMargin)

    tEnv.from("student")
      .groupBy('sex)
      .select(
        'sex,
        'name.count as "name_cnt",
        'sex.count as "sex_cnt")
      .execute()
      .print()
  }
}
